package com.jasongj.kafka.consumer;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;

public class ManualCommitConsumerDemo {

    public static void main(String[] args) {
        //args = new String[] { "kafka0:9092", "topic1", "group2", "consumer2" };
        args = new String[]{"localhost:9092", "topic1", "test-consumer-group", "consumer3"};
        if (args == null || args.length != 4) {
            System.err.println(
                    "Usage:\n\tjava -jar kafka_consumer.jar ${bootstrap_server} ${topic_name} ${group_name} ${client_id}");
            System.exit(1);
        }
        String bootstrap = args[0];
        String topic = args[1];
        String groupid = args[2];
        String clientid = args[3];

        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrap);
        props.put("group.id", groupid);
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("max.poll.interval.ms", "300000");
        props.put("max.poll.records", "500");
        props.put("auto.offset.reset", "earliest");
        props.put("deserializer.encoding", "UTF-8");
        org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));



        final int minBatchSize = 20000;
        List< String> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);


            records.partitions().forEach(topicPartition -> {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
                partitionRecords.forEach(record -> {
                    String temp = record.value()+"\n\r";
                    System.out.println(temp);
                    buffer.add(temp);
                    temp = "";
                    if (buffer.size() >= minBatchSize) {
                        try {
                            writeFile(buffer);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        consumer.commitSync();
                        buffer.clear();
                    }
                });
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(lastOffset + 1)));
            });
        }

    }

    private static void writeFile(List<String> buffer) throws IOException {
        Date now = new Date();
        SimpleDateFormat dateFormatHour= new SimpleDateFormat("yyyy-MM-dd-HH");
        String  hour = dateFormatHour.format( now );
        SimpleDateFormat dateFormatMinute= new SimpleDateFormat("yyyy-MM-dd-HH-mm");
        String  minute = dateFormatMinute.format( now );

        //File dir = new File("/home/ubuntu/writeBlock/"+hour+"/");
        //File file = new File("/home/ubuntu/writeBlock/"+hour+"/"+minute);
        File dir = new File("d:\\log_dir\\"+hour+"\\");
        File file = new File("d:\\log_dir\\"+hour+"\\"+minute);

        //判断日期文件夹是否存在，不存在的话创建
        if (!dir.isDirectory()) {
            dir.mkdir();
        }


        //判断文件是否存在，不存在的话创建
        if (!file.exists()) {
            file.createNewFile();

        }

        BufferedWriter bw=new BufferedWriter(new FileWriter(file));
        for(int i=0;i<buffer.size();i++){
            bw.write(buffer.get(i));
            bw.newLine();
        }
        bw.close();

    }


}
